In this Notebook, we show how to run a Spark Streaming application using a Notebook. There are multiple limitations to be aware of:
The code can be found here: https://github.com/ibm-watson-data-lab/spark.samples/tree/master/streaming-twitter The following code is using a pre-built jar that has been posted on the Github project, but you can replace with your own url if needed.
In [ ]:
%AddJar https://github.com/DTAIEB/demos/raw/master/streaming-twitter-assembly-1.6.jar -f
In [ ]:
val demo = com.ibm.cds.spark.samples.MessageHubStreamingTwitter
val config = demo.getConfig()
//Watson Tone Analyzer service
config.setConfig("watson.tone.url","https://gateway.watsonplatform.net/tone-analyzer-beta/api")
config.setConfig("watson.tone.password","XXXX")
config.setConfig("watson.tone.username","XXXX")
//Message Hub/Kafka service
config.setConfig("bootstrap.servers","kafka01-prod01.messagehub.services.us-south.bluemix.net:9093,kafka02-prod01.messagehub.services.us-south.bluemix.net:9093,kafka03-prod01.messagehub.services.us-south.bluemix.net:9093,kafka04-prod01.messagehub.services.us-south.bluemix.net:9093,kafka05-prod01.messagehub.services.us-south.bluemix.net:9093")
config.setConfig("api_key","XXXX")
config.setConfig("kafka.topic.tweet","twitter-spark")
config.setConfig("kafka.user.name","XXXX")
config.setConfig("kafka.user.password","XXXX")
config.setConfig("kafka_rest_url","https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443")
//Spark Streaming checkpointing configuration with Object Storage Swift container
config.setConfig("name","spark");
config.setConfig("auth_url","https://identity.open.softlayer.com");
config.setConfig("project_id","XXXX");
config.setConfig("region","dallas");
config.setConfig("user_id","XXXX");
config.setConfig("password","XXXX");
config.setConfig("checkpointDir", "swift://notebooks.spark/ssc")
Optional: The following cell is to be used only if your MessageConnect service doesn't work.
In the next cell, you configure your Twitter credentials and call the code that will connect to Twitter, fetch the tweets and send them to MessageHub for consumption (Please refer to the tutorial for more information)
In [ ]:
config.setConfig("twitter4j.oauth.consumerKey","XXXX")
config.setConfig("twitter4j.oauth.consumerSecret","XXXX")
config.setConfig("twitter4j.oauth.accessToken","XXXX")
config.setConfig("twitter4j.oauth.accessTokenSecret","XXXX")
val twitterStream = com.ibm.cds.spark.samples.KafkaProducerTest.createTwitterStream(config)
Start a new Twitter Stream that collects the live tweets and enrich them with Sentiment Analysis scores. The stream is run for a duration specified in the second argument of the startTwitterStreaming method. Note: if no duration is specified then the stream will run until the stopTwitterStreaming method is called.
In [ ]:
demo.startTwitterStreaming(sc)
In [ ]:
com.ibm.cds.spark.samples.KafkaProducerTest.closeTwitterStream
In [ ]:
demo.stopTwitterStreaming